/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.hbase.index.covered;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.data.IndexMemStore;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;

/**
 * Manage the state of the HRegion's view of the table, for the single row.
 * <p>
 * Currently, this is a single-use object - you need to create a new one for each row that you need
 * to manage. In the future, we could make this object reusable, but for the moment its easier to
 * manage as a throw-away object.
 * <p>
 * This class is <b>not</b> thread-safe - it requires external synchronization is access
 * concurrently.
 */
public class LocalTableState implements TableState {

  private long ts;
  private KeyValueStore memstore;
  private LocalHBaseState table;
  private Mutation update;
  private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
  private ScannerBuilder scannerBuilder;
  private List<Cell> kvs = new ArrayList<Cell>();
  private List<? extends IndexedColumnGroup> hints;
  private CoveredColumns columnSet;

  public LocalTableState(LocalHBaseState table, Mutation update) {
    this.table = table;
    this.update = update;
    this.memstore = new IndexMemStore();
    this.scannerBuilder = new ScannerBuilder(memstore, update);
    this.columnSet = new CoveredColumns();
  }

  public void addPendingUpdates(Cell... kvs) {
    if (kvs == null) return;
    addPendingUpdates(Arrays.asList(kvs));
  }

  public void addPendingUpdates(List<Cell> kvs) {
    if (kvs == null) return;
    setPendingUpdates(kvs);
    addUpdate(kvs);
  }

  private void addUpdate(List<Cell> list) {
    addUpdate(list, true);
  }

  private void addUpdate(List<Cell> list, boolean overwrite) {
    if (list == null) return;
    for (Cell kv : list) {
      this.memstore.add(kv, overwrite);
    }
  }

  private void addUpdateCells(List<Cell> list, boolean overwrite) {
    if (list == null) return;
    // Avoid a copy of the Cell into a KeyValue if it's already a KeyValue
    for (Cell c : list) {
      this.memstore.add(c, overwrite);
    }
  }

  @Override
  public long getCurrentTimestamp() {
    return this.ts;
  }

  /**
   * Set the current timestamp up to which the table should allow access to the underlying table.
   * This overrides the timestamp view provided by the indexer - use with care!
   * @param timestamp timestamp up to which the table should allow access.
   */
  public void setCurrentTimestamp(long timestamp) {
    this.ts = timestamp;
  }

  public void resetTrackedColumns() {
    this.trackedColumns.clear();
  }

  public Set<ColumnTracker> getTrackedColumns() {
    return this.trackedColumns;
  }

  /**
   * Get a scanner on the columns that are needed by the index.
   * <p>
   * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given
   * columns with a timestamp earlier than the timestamp to which the table is currently set (the
   * current state of the table for which we need to build an update).
   * <p>
   * If none of the passed columns matches any of the columns in the pending update (as determined
   * by {@link ColumnReference#matchesFamily(byte[])} and
   * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This
   * is because it doesn't make sense to build index updates when there is no change in the table
   * state for any of the columns you are indexing.
   * <p>
   * <i>NOTE:</i> This method should <b>not</b> be used during
   * IndexCodec#getIndexDeletes(TableState, BatchState, byte[], byte[]) as the pending update will
   * not yet have been applied - you are merely attempting to cleanup the current state and
   * therefore do <i>not</i> need to track the indexed columns.
   * <p>
   * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you
   * request - you will never see a column with the timestamp we are tracking, but the next oldest
   * timestamp for that column.
   * @param indexedColumns       the columns to that will be indexed
   * @param ignoreNewerMutations ignore mutations newer than m when determining current state.
   *                             Useful when replaying mutation state for partial index rebuild
   *                             where writes succeeded to the data table, but not to the index
   *                             table.
   * @param indexMetaData        TODO
   * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
   *         the builder. Even if no update is necessary for the requested columns, you still need
   *         to return the {@link IndexUpdate}, just don't set the update for the
   *         {@link IndexUpdate}.
   */
  public Pair<CoveredDeleteScanner, IndexUpdate> getIndexedColumnsTableState(
    Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations,
    boolean isStateForDeletes, IndexMetaData indexMetaData) throws IOException {
    // check to see if we haven't initialized any columns yet
    Collection<? extends ColumnReference> toCover =
      this.columnSet.findNonCoveredColumns(indexedColumns);

    // add the covered columns to the set
    for (ColumnReference ref : toCover) {
      this.columnSet.addColumn(ref);
    }
    boolean requiresPriorRowState = indexMetaData.requiresPriorRowState(update);
    if (!toCover.isEmpty()) {
      // no need to perform scan to find prior row values when the indexed columns are immutable, as
      // by definition, there won't be any. If we have indexed non row key columns, then we need to
      // look up the row so that we can formulate the delete of the index row correctly. We'll
      // always
      // have our "empty" key value column, so we check if we have more than that as a basis for
      // needing to lookup the prior row values.
      if (requiresPriorRowState) {
        // add the current state of the row. Uses listCells() to avoid a new array creation.
        this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations),
          false);
      }
    }

    // filter out things with a newer timestamp and track the column references to which it applies
    ColumnTracker tracker = new ColumnTracker(indexedColumns);
    synchronized (this.trackedColumns) {
      // we haven't seen this set of columns before, so we need to create a new tracker
      if (!this.trackedColumns.contains(tracker)) {
        this.trackedColumns.add(tracker);
      }
    }

    CoveredDeleteScanner scanner =
      this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts,
        // If we're determining the index state for deletes and either
        // a) we've looked up the prior row state or
        // b) we're inserting immutable data
        // then allow a null scanner to be returned.
        // FIXME: this is crappy code - we need to simplify the global mutable secondary index
        // implementation
        // TODO: use mutable transactional secondary index implementation instead
        // (PhoenixTransactionalIndexer)
        isStateForDeletes && (requiresPriorRowState || insertingData(update)));
    return new Pair<CoveredDeleteScanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
  }

  private static boolean insertingData(Mutation m) {
    for (Collection<Cell> cells : m.getFamilyCellMap().values()) {
      for (Cell cell : cells) {
        if (cell.getType() != Cell.Type.Put) {
          return false;
        }
      }
    }
    return true;
  }

  @Override
  public byte[] getCurrentRowKey() {
    return this.update.getRow();
  }

  /**
   */
  public void setHints(List<? extends IndexedColumnGroup> hints) {
    this.hints = hints;
  }

  @Override
  public List<? extends IndexedColumnGroup> getIndexColumnHints() {
    return this.hints;
  }

  @Override
  public Collection<Cell> getPendingUpdate() {
    return this.kvs;
  }

  /**
   * Set the {@link KeyValue}s in the update for which we are currently building an index update,
   * but don't actually apply them. pending {@link KeyValue}s
   */
  public void setPendingUpdates(Collection<Cell> update) {
    this.kvs.clear();
    this.kvs.addAll(update);
  }

  /**
   * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
   */
  public void applyPendingUpdates() {
    this.addUpdate(kvs);
  }

  /**
   * Rollback all the given values from the underlying state.
   */
  public void rollback(Collection<KeyValue> values) {
    for (KeyValue kv : values) {
      this.memstore.rollback(kv);
    }
  }

  @Override
  public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(
    Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations,
    boolean isStateForDeletes, IndexMetaData indexMetaData) throws IOException {
    Pair<CoveredDeleteScanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns,
      ignoreNewerMutations, isStateForDeletes, indexMetaData);
    ValueGetter valueGetter =
      IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
    return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
  }
}
